import numpy as npimport pandas as pdimport matplotlib.pyplot as pltfrom statistics import meanfrom sklearn.datasets import make_regressionfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import mean_squared_errorimport plotly.io as pio# Use pio to ensure plotly plots render in Quartopio.renderers.default ='notebook_connected'
Function to Plot Training Data and Regression Plane
Code
import plotly.graph_objects as godef plot_3d_regression(model, model_name, data_type ='linear'):# Make Standard Data X, y = make_regression( n_samples=300, # Number of samples n_features=2, # Number of features noise=15, # Add noise to make it realistic random_state=42# Set seed for reproducibility)# Make standard non-linear dataif data_type.lower() =='non-linear': y = y +500* np.sin(X[:, 0]) +250* np.cos(X[:, 1])# Make non-standard dataif data_type =='ugly':# Add non-linear transformations y = y +500* np.sin(X[:, 0]) * np.cos(X[:, 1]) \+300* (X[:, 0] **2) \-200* np.exp(-X[:, 1] **2)# Add random feature interactions y = y +100* (X[:, 0] * X[:, 1])# Add random noise to make it "uglier" y = y + np.random.normal(0, 150, size=y.shape) model.fit(X, y)# Create a mesh grid for the features x_grid, y_grid = np.meshgrid(np.linspace(min(X[:, 0]), max(X[:, 0]), 100), np.linspace(min(X[:, 1]), max(X[:, 1]), 100)) grid = np.vstack((x_grid.flatten(), y_grid.flatten())).T predictions = model.predict(grid)# Create 3D scatter plot for training data scatter = go.Scatter3d( x=X[:, 0], y=X[:, 1], z=y, mode='markers', marker=dict(color='blue', size=5), name='Training Data' )# Create 3D surface plot for the regression surface surface = go.Surface( x=x_grid, y=y_grid, z=predictions.reshape(x_grid.shape), opacity=0.5, colorscale='Viridis', name='Regression Surface' )# Combine both traces into one figure fig = go.Figure(data=[scatter, surface])# Update layout for better visualization fig.update_layout( title=f'Training Data and Regression Surface for {model_name}', scene=dict( xaxis_title='Feature 1', yaxis_title='Feature 2', zaxis_title='Target' ) )# Show plot fig.show()
Regression Algorithms
OLS Regression
class ols_regression():# Initialize the classdef__init__(self):passdef fit(self, X, y):'''Fit the regression to the X data via the OLS equation'''# Add a leading colums of 1s to the X data to account for the bias term X = np.hstack((np.ones((X.shape[0], 1)), X))# Train the data on (X.T @ X)^(-1) @ X.T @ y ols = np.linalg.inv(X.T.dot(X)).dot(X.T.dot(y))self.coef = ols[1:]self.bias = ols[0]def predict(self, X):'''Predict new data with the trained coefficients and bias'''# Check if the X data is 1D and reshape if neededif X.ndim ==1: X = X.reshape(-1, 1) # Make predictions by dotting the new data with the coefficients and adding the biasself.predictions = X.dot(self.coef) +self.biasreturnself.predictions
class GDRegression():def__init__(self, epochs, eta):'''Initialize the Gradient Descent Regression Class'''self.epochs = epochsself.eta = etadef fit(self, X, y, batch_size ="TBD"):'''Train the Gradient Descent Regression Class'''if batch_size =='TBD': batch_size = X.shape[0]# Create random initialization for the bias and coefficients bias = np.random.random() coef = np.random.random(X.shape[1])# Iterate through each epochforiterinrange(self.epochs): indices = np.random.choice(X.shape[0], size=min(batch_size, len(y)), replace=False) X_batch = X[indices] y_batch = y[indices]# Make predictions for the X data being trained on y_hat = X_batch.dot(coef) + bias# Calculate the derrivative WRT bias and coef given the predicions derr_b =2/X_batch.shape[0] *sum((y_hat - y_batch)) derr_c =2/X_batch.shape[0] * X_batch.T.dot(y_hat - y_batch)# Update the bias and the coef based on the derrivative bias = bias - derr_b *self.eta coef = coef - derr_c *self.eta# Finalize the bias and coefself.bias = biasself.coef = coefdef predict(self, X):'''Predict new data given the learned bias and coef''' predictions = X.dot(self.coef) +self.biasreturn predictions
class KNNRegressor():def__init__(self, n_neighbors=5):'''Initialize the regressor with a defined number of nearest neighbors'''self.n_neighbors = n_neighborsdef fit(self, X, y):'''Train the regressor by loading in all X and y data'''self.X = Xself.y = ydef predict(self, X):'''Make predictions based on the training data using euclidian distance''' predictions = np.empty(0)# For each test point...for test_point in X:# Calculate the distance between the test point and all training points distances = np.linalg.norm(self.X - test_point, axis=1)# Find the n_neighbors closest points closest_points_indices = np.argsort(distances)[:self.n_neighbors]# Use the mean of the closest points to formulate a predictions and append to the predictions array prediction = mean(self.y[closest_points_indices]) predictions = np.append(predictions, prediction)return predictions
class DecisionTreeRegressor():def__init__(self, max_depth=None):# Initializes the decision tree regressor with an optional maximum depth for the tree.self.max_depth = max_depth# Function for calculating the Mean Squared Error (MSE) of a splitdef mse(self, y):# MSE is calculated as the average of squared differences from the meanreturn np.mean((y - np.mean(y)) **2)# Function to find the best split at any given point (based on MSE)def _best_split(self, X, y):# Initialize variables to track the best split foundself.best_mse =float('inf') # Best MSE starts as infinityself.best_feature =Noneself.best_split_value =Noneself.best_left_y =Noneself.best_right_y =None# Loop over each feature in the datasetfor feature_num inrange(X.shape[1]):# Get unique values in the feature column to test potential splits feature_values = np.unique(X[:, feature_num])# For each unique value, try splitting the datafor value in feature_values:# Find indices where the feature values are less than or equal to the split value left_index = X[:, feature_num] <= value right_index = X[:, feature_num] > value# Split the target values accordingly left_targets = y[left_index] right_targets = y[right_index]# Proceed only if both splits result in non-empty groupsiflen(left_targets) >0andlen(right_targets) >0:# Compute MSE for both the left and right splits left_mse =self.mse(left_targets) right_mse =self.mse(right_targets)# Calculate the weighted average MSE of the two splits total_average_mse = left_mse *len(left_targets)/len(y) + right_mse *len(right_targets)/len(y)# If this split provides a better (lower) MSE, update the best split foundif total_average_mse <self.best_mse:self.best_mse = total_average_mseself.best_feature = feature_numself.best_split_value = valueself.left_y = left_targetsself.right_y = right_targets# Return the best split information (feature index, value, and target splits)returnself.best_split_value, self.best_feature, self.left_y, self.right_y# Function to recursively build the decision treedef _build_tree(self, X, y, depth=0):# Base case: If all targets are the same or max depth is reached, return the mean of the target valuesiflen(np.unique(y)) ==1or (self.max_depth isnotNoneand depth >=self.max_depth):return np.mean(y)# Find the best split for the data best_split_value, best_feature, left_y, right_y =self._best_split(X, y)# If no valid split was found, return the mean of the targetsif best_feature isNone:return np.mean(y)# Split the data based on the best feature and split value left_index = X[:, best_feature] <= best_split_value right_index = X[:, best_feature] > best_split_value# Recursively build the left and right subtrees left_tree =self._build_tree(X[left_index], left_y, depth +1) right_tree =self._build_tree(X[right_index], right_y, depth +1)# Return the current node as a dictionary with the best split and its left and right subtreesreturn {'feature': best_feature,'value': best_split_value,'left': left_tree,'right': right_tree }# Function to make a prediction for a single sample using the treedef _single_prediction(self, tree, x):# If the current tree node is a dictionary (not a leaf), recursively traverse the treeifisinstance(tree, dict):if x[tree['feature']] < tree['value']:returnself._single_prediction(tree['left'], x) # Go leftelse:returnself._single_prediction(tree['right'], x) # Go right# If the current node is a leaf (not a dictionary), return the prediction (mean of targets)else:return tree# Function to predict target values for a set of samplesdef predict(self, X):# For each sample in X, make a prediction by recursively traversing the tree predictions = np.array([self._single_prediction(self.tree, x) for x in X])return predictions# Function to fit the decision tree to the training datadef fit(self, X, y):# Build the tree by calling the recursive function with the training dataself.tree =self._build_tree(X, y)
class RandomForestRegressor():def__init__(self, n_estimators, max_depth=None):# Initializes the random forest regressor with the number of estimators (trees) and an optional maximum depth for each tree.self.n_estimators = n_estimatorsself.max_depth = max_depth# Function for calculating the Mean Squared Error (MSE) of a splitdef mse(self, y):# MSE is calculated as the average of squared differences from the meanreturn np.mean((y - np.mean(y)) **2)# Function to find the best split at any given point (based on MSE)def _best_split(self, X, y):# Initialize variables to track the best split foundself.best_mse =float('inf') # Best MSE starts as infinityself.best_feature =Noneself.best_split_value =Noneself.best_left_y =Noneself.best_right_y =None# Randomly sample 1/3 of the total features to consider for splitting n_features_to_consider =max(1, X.shape[1] //3) random_feature_indices = np.random.choice(X.shape[1], size=n_features_to_consider, replace=False)# Only consider the randomly selected features for splitting feature_subset = X[:, random_feature_indices]# Loop through the randomly selected features and find the best splitfor i, feature_num inenumerate(random_feature_indices): # Map back to original feature indices feature_values = np.unique(feature_subset[:, i])for value in feature_values:# Boolean indices based on the feature subset left_index = feature_subset[:, i] <= value right_index = feature_subset[:, i] > value# Subset targets using these indices left_targets = y[left_index] right_targets = y[right_index]# Ensure both sides have samplesiflen(left_targets) >0andlen(right_targets) >0:# Calculate the MSE for both the left and right subsets left_mse =self.mse(left_targets) right_mse =self.mse(right_targets) total_average_mse = left_mse *len(left_targets) /len(y) + right_mse *len(right_targets) /len(y)# If this split results in a better (lower) MSE, update the best splitif total_average_mse <self.best_mse:self.best_mse = total_average_mseself.best_feature = feature_numself.best_split_value = valueself.left_y = left_targetsself.right_y = right_targets# Return the best split information (feature index, value, and target splits)returnself.best_split_value, self.best_feature, self.left_y, self.right_y# Function to recursively build the decision treedef _build_tree(self, X, y, depth=0):# Base case: If all targets are the same or max depth is reached, return the mean of the target valuesiflen(np.unique(y)) ==1or (self.max_depth isnotNoneand depth >=self.max_depth):return np.mean(y)# Find the best split for the data best_split_value, best_feature, left_y, right_y =self._best_split(X, y)# If no valid split was found, return the mean of the targetsif best_feature isNone:return np.mean(y)# Split the data based on the best feature and split value left_index = X[:, best_feature] <= best_split_value right_index = X[:, best_feature] > best_split_value# Recursively build the left and right subtrees left_tree =self._build_tree(X[left_index], left_y, depth +1) right_tree =self._build_tree(X[right_index], right_y, depth +1)# Return the current node as a dictionary with the best split and its left and right subtreesreturn {'feature': best_feature,'value': best_split_value,'left': left_tree,'right': right_tree }# Function to make a prediction for a single sample using the treedef _single_prediction(self, tree, x):# If the current tree node is a dictionary (not a leaf), recursively traverse the treeifisinstance(tree, dict):if x[tree['feature']] < tree['value']:returnself._single_prediction(tree['left'], x) # Go leftelse:returnself._single_prediction(tree['right'], x) # Go right# If the current node is a leaf (not a dictionary), return the prediction (mean of targets)else:return tree# Function to predict target values for a set of samplesdef predict(self, X): predictions = []# For each tree in the random forest, make predictions for all samples in Xfor tree inself.trees: model_predictions = np.array([self._single_prediction(tree, x) for x in X]) predictions.append(model_predictions)# Combine all the predictions of all the trees and average across the trees all_predictions = np.column_stack(predictions) all_predictions = np.mean(all_predictions, axis=1)return all_predictions# Function to train the random forest modeldef fit(self, X, y): models = []# Create n decision trees, each trained with bootstrapping (sampling with replacement)for n inrange(self.n_estimators): random_row_indices = np.random.choice(len(X), len(X), replace=True) subset_X = X[random_row_indices] subset_y = y[random_row_indices] tree =self._build_tree(subset_X, subset_y)# Add each trained tree to the list of models models.append(tree)# Save all the trained treesself.trees = models
class GradientBoostingRegressor:def__init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):# Initialize model with given number of estimators, learning rate, and max depth for each treeself.n_estimators = n_estimatorsself.learning_rate = learning_rateself.max_depth = max_depthdef fit(self, X, y):# Initialize predictions as the mean of the target valuesself.y_pred = np.full_like(y, np.mean(y), dtype=np.float32)self.trees = []for _ inrange(self.n_estimators):# Calculate residuals residuals = y -self.y_pred# Fit a tree to the residuals tree = DecisionTreeRegressor(max_depth=self.max_depth) tree.fit(X, residuals)# Update predictions with the tree's output scaled by learning rateself.y_pred +=self.learning_rate * tree.predict(X)self.trees.append(tree)def predict(self, X):# Start with initial predictions as the mean of target values y_pred = np.full(X.shape[0], np.mean(self.y_pred), dtype=np.float32)# Sum the residual predictions from all treesfor tree inself.trees: y_pred +=self.learning_rate * tree.predict(X)return y_pred